package main

import (
	"delayed-task-hub/common"
	"delayed-task-hub/internal/config"
	"delayed-task-hub/internal/handler"
	"delayed-task-hub/internal/svc"
	"encoding/json"
	"flag"
	"fmt"
	"github.com/patrickmn/go-cache"
	"github.com/zeromicro/go-queue/dq"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/hash"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/rest"
	"time"
)

var configFile = flag.String("f", "etc/delaytask.yaml", "the config file")

func runConsumer(cnf dq.DqConf, ctx *svc.ServiceContext) {
	logx.Info("启动协程")
	c := cache.New(10*time.Minute, 5*time.Minute)

	consumer := dq.NewConsumer(cnf)
	consumer.Consume(func(body []byte) {
		key := hash.Md5Hex(body)
		_, found := c.Get(key)
		if found {
			// 已处理过的key被找到，于是忽略
			return
		}
		c.Set(key, nil, 10*time.Minute)
		// 在这里处理你的消息
		logx.Infof("数据=%v", string(body))
		var pkg common.Package
		err := json.Unmarshal(body, &pkg)
		if err != nil {
			logx.Errorf("格式化错误 err=%v", err)
			return
		} else {
			//_, err := ctx.Redis.Zadd(pkg.ListName, pkg.Index, fmt.Sprintf("%v", pkg.Index))
			_, err = ctx.Redis.Lpush(pkg.ListName, pkg.PayLoad)
			if err != nil {
				logx.Errorf("写入 redis 出现错误 err=%v", err)
			}
		}
		// 处理完成后保存key，防止重复处理
		// 当添加key到缓存中时，设置过期时间为10分钟

	})
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	logx.MustSetup(c.LogConf)
	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()

	ctx := svc.NewServiceContext(c)
	handler.RegisterHandlers(server, ctx)

	go runConsumer(c.DqConf, ctx)
	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	server.Start()
}
